// Copyright 2021 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

package v1alpha1

import (
	"context"
	"crypto/x509"
	"encoding/pem"
	"fmt"
	"math"
	"regexp"
	"strconv"
	"strings"

	cmmeta "github.com/cert-manager/cert-manager/pkg/apis/meta/v1"
	"github.com/go-logr/logr"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	"k8s.io/apimachinery/pkg/api/resource"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/apimachinery/pkg/util/intstr"
	"k8s.io/apimachinery/pkg/util/validation/field"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/webhook"

	"github.com/redpanda-data/redpanda/src/go/k8s/pkg/resources/featuregates"
	"github.com/redpanda-data/redpanda/src/go/k8s/pkg/utils"
)

const (
	kb = 1024
	mb = 1024 * kb
	gb = 1024 * mb

	minimumReplicas = 3

	// these constants can be removed after versions older that v22.3.1 are no longer supported
	defaultTopicReplicationKey              = "redpanda.default_topic_replications"
	transactionCoordinatorReplicationKey    = "redpanda.transaction_coordinator_replication"
	idAllocatorReplicationKey               = "redpanda.id_allocator_replication"
	defaultTopicReplicationNumber           = 3
	transactionCoordinatorReplicationNumber = 3
	idAllocatorReplicationNumber            = 3

	internalTopicReplicationFactorKey     = "redpanda.internal_topic_replication_factor"
	defaultInternalTopicReplicationNumber = 3

	noneAuthorizationMechanism         = "none"
	saslAuthorizationMechanism         = "sasl"
	mTLSIdentityAuthorizationMechanism = "mtls_identity"
	httpBasicAuthorizationMechanism    = "http_basic"

	defaultSchemaRegistryPort = 8081
)

// validHostnameSegment matches valid DNS name segments.
var validHostnameSegment = regexp.MustCompile(`^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])$`)

// AllowDownscalingInWebhook controls the downscaling alpha feature in the Cluster custom resource.
// Downscaling is not stable since nodeIDs are currently not reusable, so adding to a cluster a node
// that has previously been decommissioned can cause issues.
var AllowDownscalingInWebhook = false

// DefaultLicenseSecretKey is the default key required in secret referenced by `SecretKeyRef`.
var DefaultLicenseSecretKey = "license"

type resourceField struct {
	resources *corev1.ResourceRequirements
	path      *field.Path
}

type redpandaResourceField struct {
	resources *RedpandaResourceRequirements
	path      *field.Path
}

// kclient is controller-runtime client.
var kclient client.Client

// SetK8sClient sets kclient for testing
func SetK8sClient(c client.Client) {
	kclient = c
}

// SetupWebhookWithManager autogenerated function by kubebuilder
func (r *Cluster) SetupWebhookWithManager(mgr ctrl.Manager) error {
	// We should use lower level webhook package
	// But since we already have this webhook we just instantiate the client
	// REF https://github.com/kubernetes-sigs/kubebuilder/issues/1216
	kclient = mgr.GetClient()
	return ctrl.NewWebhookManagedBy(mgr).
		For(r).
		Complete()
}

//+kubebuilder:webhook:path=/mutate-redpanda-vectorized-io-v1alpha1-cluster,mutating=true,failurePolicy=fail,sideEffects=None,groups=redpanda.vectorized.io,resources=clusters,verbs=create;update,versions=v1alpha1,name=mcluster.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Defaulter = &Cluster{}

func redpandaResourceFields(c *Cluster) redpandaResourceField {
	return redpandaResourceField{&c.Spec.Resources, field.NewPath("spec").Child("resources")}
}

func sidecarResourceFields(c *Cluster) []resourceField {
	var resources []resourceField

	if c.Spec.Sidecars.RpkStatus != nil && c.Spec.Sidecars.RpkStatus.Enabled {
		resources = append(resources, resourceField{c.Spec.Sidecars.RpkStatus.Resources, field.NewPath("spec").Child("resourcesRpkStatus")})
	}
	return resources
}

// Default implements defaulting webhook logic - all defaults that should be
// applied to cluster CRD after user submits it should be put in here
func (r *Cluster) Default() {
	log := ctrl.Log.WithName("Cluster.Default").WithValues("namespace", r.Namespace, "name", r.Name)
	log.Info("defaulting")
	if r.Spec.Configuration.SchemaRegistry != nil && r.Spec.Configuration.SchemaRegistry.Port == 0 {
		r.Spec.Configuration.SchemaRegistry.Port = defaultSchemaRegistryPort
	}

	if r.Spec.CloudStorage.Enabled && r.Spec.CloudStorage.CacheStorage != nil && r.Spec.CloudStorage.CacheStorage.Capacity.Value() == 0 {
		r.Spec.CloudStorage.CacheStorage.Capacity = resource.MustParse("20G")
	}

	r.setDefaultAdditionalConfiguration()
	if r.Spec.PodDisruptionBudget == nil {
		defaultMaxUnavailable := intstr.FromInt(1)
		r.Spec.PodDisruptionBudget = &PDBConfig{
			Enabled:        true,
			MaxUnavailable: &defaultMaxUnavailable,
		}
	}

	if r.Spec.LicenseRef != nil && r.Spec.LicenseRef.Key == "" {
		r.Spec.LicenseRef.Key = DefaultLicenseSecretKey
	}

	for i := range r.Spec.Configuration.KafkaAPI {
		if r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod == "" {
			r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod = noneAuthorizationMechanism
		}
	}

	if r.Spec.RestartConfig == nil {
		r.Spec.RestartConfig = &RestartConfig{
			DisableMaintenanceModeHooks:       nil,
			UnderReplicatedPartitionThreshold: 0,
		}
	}
}

func (r *Cluster) getDefaultAdditionalConfiguration() map[string]int {
	if featuregates.InternalTopicReplication(r.Spec.Version) {
		return map[string]int{
			defaultTopicReplicationKey:        defaultTopicReplicationNumber,
			internalTopicReplicationFactorKey: defaultInternalTopicReplicationNumber,
		}
	} else {
		return map[string]int{
			defaultTopicReplicationKey:           defaultTopicReplicationNumber,
			transactionCoordinatorReplicationKey: transactionCoordinatorReplicationNumber,
			idAllocatorReplicationKey:            idAllocatorReplicationNumber,
		}
	}
}

// setDefaultAdditionalConfiguration sets additional configuration fields based
// on the best practices
func (r *Cluster) setDefaultAdditionalConfiguration() {
	if r.Spec.Replicas != nil && *r.Spec.Replicas >= minimumReplicas {
		if r.Spec.AdditionalConfiguration == nil {
			r.Spec.AdditionalConfiguration = make(map[string]string)
		}

		for k, v := range r.getDefaultAdditionalConfiguration() {
			_, ok := r.Spec.AdditionalConfiguration[k]
			if !ok {
				r.Spec.AdditionalConfiguration[k] = strconv.Itoa(v)
			}
		}
	}
}

// TODO(user): change verbs to "verbs=create;update;delete" if you want to enable deletion validation.
//+kubebuilder:webhook:path=/validate-redpanda-vectorized-io-v1alpha1-cluster,mutating=false,failurePolicy=fail,sideEffects=None,groups=redpanda.vectorized.io,resources=clusters,verbs=create;update,versions=v1alpha1,name=vcluster.kb.io,admissionReviewVersions={v1,v1beta1}

var _ webhook.Validator = &Cluster{}

// ValidateCreate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateCreate() error {
	log := ctrl.Log.WithName("Cluster.ValidateCreate").WithValues("namespace", r.Namespace, "name", r.Name)
	log.Info("validating create")

	allErrs := r.validateCommon(log)

	if len(allErrs) == 0 {
		return nil
	}

	return apierrors.NewInvalid(
		r.GroupVersionKind().GroupKind(),
		r.Name, allErrs)
}

// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateUpdate(old runtime.Object) error {
	log := ctrl.Log.WithName("Cluster.ValidateUpdate").WithValues("namespace", r.Namespace, "name", r.Name)
	log.Info("validating update")

	// Don't validate if the cluster is being deleted.
	// After receiving delete, the controller will update the cluster CR to remove the finalzer.
	// If we validated in this scenario, some of the validations will fail, such that
	// the clientCACertRef can fail with secret not found if the referenced secret
	// has already been deleted as part of the cluster deletion.
	if !r.GetDeletionTimestamp().IsZero() {
		return nil
	}

	oldCluster := old.(*Cluster)
	allErrs := r.validateCommon(log)

	allErrs = append(allErrs, r.validateDownscaling(oldCluster)...)

	allErrs = append(allErrs, r.validateRedpandaCoreChanges(oldCluster)...)

	allErrs = append(allErrs, r.validateLicense(oldCluster)...)

	if len(allErrs) == 0 {
		return nil
	}

	return apierrors.NewInvalid(
		r.GroupVersionKind().GroupKind(),
		r.Name, allErrs)
}

func (r *Cluster) validateCommon(log logr.Logger) field.ErrorList {
	vcLog := log.WithName("validateCommon")
	var allErrs field.ErrorList
	allErrs = append(allErrs, r.validateScaling()...)
	allErrs = append(allErrs, r.validateKafkaListeners(vcLog)...)
	allErrs = append(allErrs, r.validateAdminListeners()...)
	allErrs = append(allErrs, r.validatePandaproxyListeners(vcLog)...)
	allErrs = append(allErrs, r.validateSchemaRegistryListener()...)
	allErrs = append(allErrs, r.checkCollidingPorts()...)
	allErrs = append(allErrs, r.validateRedpandaMemory()...)
	for _, rf := range sidecarResourceFields(r) {
		allErrs = append(allErrs, r.validateResources(rf)...)
	}
	allErrs = append(allErrs, r.validateRedpandaResources(redpandaResourceFields(r))...)
	allErrs = append(allErrs, r.validateArchivalStorage()...)
	allErrs = append(allErrs, r.validatePodDisruptionBudget()...)
	if featuregates.InternalTopicReplication(r.Spec.Version) {
		allErrs = append(allErrs, r.validateAdditionalConfiguration()...)
	}
	return allErrs
}

func (r *Cluster) validateScaling() field.ErrorList {
	var allErrs field.ErrorList
	if r.Spec.Replicas == nil {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("replicas"),
				r.Spec.Replicas,
				"replicas must be specified explicitly"))
	} else if *r.Spec.Replicas <= 0 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("replicas"),
				r.Spec.Replicas,
				"downscaling is not allowed to less than 1 instance"))
	}

	return allErrs
}

func (r *Cluster) validateDownscaling(old *Cluster) field.ErrorList {
	var allErrs field.ErrorList
	if !AllowDownscalingInWebhook && old.Spec.Replicas != nil && r.Spec.Replicas != nil && *r.Spec.Replicas < *old.Spec.Replicas {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("replicas"),
				r.Spec.Replicas,
				"downscaling is not enabled: unset --allow-downscaling=false in the controller parameters to enable it"))
	}
	return allErrs
}

func (r *Cluster) validateAdminListeners() field.ErrorList {
	var allErrs field.ErrorList
	externalAdmin := r.AdminAPIExternal()
	targetAdminCount := 1
	if externalAdmin != nil {
		targetAdminCount = 2
	}
	if len(r.Spec.Configuration.AdminAPI) != targetAdminCount {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"need exactly one internal API listener and up to one external"))
	}

	if externalAdmin != nil && externalAdmin.Port != 0 && (externalAdmin.Port < 30000 || externalAdmin.Port > 32768) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"external port must be in the following range: 30000-32768"))
	}
	if externalAdmin != nil && externalAdmin.External.PreferredAddressType != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"cannot have an preferred address type for admin listener"))
	}
	if externalAdmin != nil && externalAdmin.External.Bootstrap != nil {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"bootstrap loadbalancer not available for http admin api"))
	}
	if externalAdmin != nil && externalAdmin.External.EndpointTemplate != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi"),
				r.Spec.Configuration.AdminAPI,
				"cannot provide an endpoint template for admin listener"))
	}

	// for now only one listener can have TLS to be backward compatible with v1alpha1 API
	foundListenerWithTLS := false
	for i, p := range r.Spec.Configuration.AdminAPI {
		if p.TLS.Enabled {
			if foundListenerWithTLS {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("adminApi").Index(i).Child("tls"),
						r.Spec.Configuration.AdminAPI[i].TLS,
						"only one listener can have TLS enabled"))
			}
			foundListenerWithTLS = true
		}
		// we need to run the validation on all listeners to also catch errors like !Enabled && RequireClientAuth
		allErrs = append(allErrs, validateAdminTLS(p.TLS, field.NewPath("spec").Child("configuration").Child("adminApi").Index(i).Child("tls"))...)
	}
	return allErrs
}

func (r *Cluster) validateKafkaListeners(l logr.Logger) field.ErrorList {
	log := l.WithName("validateKafkaListeners")
	var allErrs field.ErrorList
	if len(r.Spec.Configuration.KafkaAPI) == 0 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"need at least one kafka api listener"))
	}

	var external *KafkaAPI
	var externalIdx int
	for i, p := range r.Spec.Configuration.KafkaAPI {
		if p.External.Enabled {
			if external != nil {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
						r.Spec.Configuration.KafkaAPI,
						"only one kafka api listener can be marked as external"))
			}
			external = &r.Spec.Configuration.KafkaAPI[i]
			externalIdx = i
		}
	}

	for i, p := range r.Spec.Configuration.KafkaAPI {
		tlsErrs := validateListener(
			p.TLS.Enabled,
			p.TLS.RequireClientAuth,
			p.TLS.IssuerRef,
			p.TLS.NodeSecretRef,
			nil,
			field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(i).Child("tls"),
			&p.External,
			field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(i).Child("external"),
			r.GetNamespace())
		allErrs = append(allErrs, tlsErrs...)

		switch r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod {
		case noneAuthorizationMechanism:
		case saslAuthorizationMechanism:
		case mTLSIdentityAuthorizationMechanism:
		default:
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(i).Child("authenticationMethod"),
					r.Spec.Configuration.KafkaAPI[i].AuthenticationMethod,
					"authentication method is invalid. Valid options are: none, sasl, mtls_identity"))
		}
	}

	allErrs = append(allErrs,
		validateTLSRules(r.KafkaTLSListeners(), field.NewPath("spec").Child("configuration").Child("kafkaApi"))...)

	if !((len(r.Spec.Configuration.KafkaAPI) == 2 && external != nil) || (external == nil && len(r.Spec.Configuration.KafkaAPI) == 1)) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"one internal listener and up to to one external kafka api listener is required"))
	}
	if external != nil && external.Port != 0 && (external.Port < 30000 || external.Port > 32768) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"external port must be in the following range: 30000-32768"))
	}
	if external != nil && external.External.PreferredAddressType != "" && external.External.Subdomain != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"cannot provide both a preferred address type and a subdomain"))
	}
	if external != nil && external.External.Bootstrap != nil && external.External.Bootstrap.Port == 0 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi"),
				r.Spec.Configuration.KafkaAPI,
				"bootstrap port cannot be empty"))
	}
	//nolint:dupl // not identical
	if external != nil && external.External.EndpointTemplate != "" {
		if external.External.Subdomain == "" {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(externalIdx).Child("external"),
					external.External,
					"endpointTemplate can only be used in combination with subdomain"))
		}

		err := checkValidEndpointTemplate(external.External.EndpointTemplate)
		if err != nil {
			log.Error(err, "Invalid endpoint template received", "template", external.External.EndpointTemplate)
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("kafkaApi").Index(externalIdx).Child("external").Child("endpointTemplate"),
					external.External.EndpointTemplate,
					fmt.Sprintf("template is invalid: %v", err)))
		}
	}

	return allErrs
}

func checkValidEndpointTemplate(tmpl string) error {
	// Using an example input to ensure that the template expression is allowed
	data := utils.NewEndpointTemplateData(0, "1.2.3.4")
	_, err := utils.ComputeEndpoint(tmpl, data)
	return err
}

//nolint:funlen,gocyclo // it's a sequence of checks
func (r *Cluster) validatePandaproxyListeners(l logr.Logger) field.ErrorList {
	var allErrs field.ErrorList
	var proxyExternal *PandaproxyAPI
	log := l.WithName("validatePandaproxyListeners")
	kafkaExternal := r.ExternalListener()
	p := r.Spec.Configuration.PandaproxyAPI
	for i := range r.Spec.Configuration.PandaproxyAPI {
		if !p[i].External.Enabled {
			continue
		}
		if proxyExternal != nil {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"only one pandaproxy api listener can be marked as external"))
		}
		proxyExternal = &r.Spec.Configuration.PandaproxyAPI[i]
		if proxyExternal.Port != 0 && (proxyExternal.Port < 30000 || proxyExternal.Port > 32768) {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"external port must be in the following range: 30000-32768"))
		}
		if proxyExternal.External.PreferredAddressType != "" {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("external").Child("preferredAddressType"),
					r.Spec.Configuration.PandaproxyAPI[i].External.PreferredAddressType,
					"cannot have a pandaproxy external preferred address type"))
		}
		if proxyExternal.External.Bootstrap != nil {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"bootstrap loadbalancer not available for pandaproxy"))
		}
		if (kafkaExternal == nil || !kafkaExternal.External.Enabled) && proxyExternal.External.Enabled {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"cannot have a pandaproxy external listener without a kafka external listener"))
		}
		if kafkaExternal == nil && proxyExternal.External.Subdomain != "" {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"kafka external listener is empty but must specify the same sudomain as that of the external pandaproxy"))
		}
		if kafkaExternal != nil && kafkaExternal.External.Subdomain != proxyExternal.External.Subdomain {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i),
					r.Spec.Configuration.PandaproxyAPI[i],
					"sudomain of external pandaproxy must be the same as kafka's"))
		}
		//nolint:dupl // not identical
		if kafkaExternal != nil && proxyExternal.External.EndpointTemplate != "" {
			if proxyExternal.External.Subdomain == "" {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("external"),
						proxyExternal.External,
						"endpointTemplate can only be used in combination with subdomain"))
			}

			err := checkValidEndpointTemplate(proxyExternal.External.EndpointTemplate)
			if err != nil {
				log.Error(err, "Invalid endpoint template received", "template", proxyExternal.External.EndpointTemplate)
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).
						Child("external").Child("endpointTemplate"),
						proxyExternal.External.EndpointTemplate,
						fmt.Sprintf("template is invalid: %v", err)))
			}
		}
		if proxyExternal.External.Ingress != nil && proxyExternal.External.Ingress.Endpoint != "" && !validHostnameSegment.MatchString(proxyExternal.External.Ingress.Endpoint) {
			allErrs = append(allErrs,
				field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).
					Child("external").Child("ingress").Child("endpoint"),
					proxyExternal.External.Ingress.Endpoint,
					fmt.Sprintf("ingress endpoint for pandaproxy API does not match regexp %s", validHostnameSegment.String())))
		}
	}

	// for now only one listener can have TLS to be backward compatible with v1alpha1 API
	foundListenerWithTLS := false
	for i := range r.Spec.Configuration.PandaproxyAPI {
		if p[i].TLS.Enabled {
			if foundListenerWithTLS {
				allErrs = append(allErrs,
					field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("tls"),
						r.Spec.Configuration.PandaproxyAPI[i].TLS,
						"only one pandaproxy listener can have TLS enabled"))
			}
			foundListenerWithTLS = true
		}
		tlsErrs := validateListener(
			p[i].TLS.Enabled,
			p[i].TLS.RequireClientAuth,
			p[i].TLS.IssuerRef,
			p[i].TLS.NodeSecretRef,
			p[i].TLS.ClientCACertRef,
			field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("tls"),
			&p[i].External.ExternalConnectivityConfig,
			field.NewPath("spec").Child("configuration").Child("pandaproxyApi").Index(i).Child("external"),
			r.GetNamespace(),
		)
		allErrs = append(allErrs, tlsErrs...)
	}

	// If we have an external proxy listener and no other listeners, we're missing an internal one
	if proxyExternal != nil && len(r.Spec.Configuration.PandaproxyAPI) == 1 {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi"),
				r.Spec.Configuration.PandaproxyAPI,
				"an internal pandaproxy listener is required when an external one is provided"))
	}

	if !((len(r.Spec.Configuration.PandaproxyAPI) == 2 && proxyExternal != nil) || (proxyExternal == nil && len(r.Spec.Configuration.PandaproxyAPI) <= 1)) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("pandaproxyApi"),
				r.Spec.Configuration.PandaproxyAPI,
				"up to one internal listener and no external listener, or one external and one internal listener for pandaproxy is allowed"))
	}

	return allErrs
}

func (r *Cluster) validateSchemaRegistryListener() field.ErrorList {
	var allErrs field.ErrorList
	schemaRegistry := r.Spec.Configuration.SchemaRegistry
	if schemaRegistry == nil {
		return allErrs
	}
	if schemaRegistry.TLS != nil {
		tlsErrs := validateListener(
			schemaRegistry.TLS.Enabled,
			schemaRegistry.TLS.RequireClientAuth,
			schemaRegistry.TLS.IssuerRef,
			schemaRegistry.TLS.NodeSecretRef,
			schemaRegistry.TLS.ClientCACertRef,
			field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("tls"),
			schemaRegistry.GetExternal(),
			field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external"),
			r.GetNamespace(),
		)
		allErrs = append(allErrs, tlsErrs...)
	}
	if !r.IsSchemaRegistryExternallyAvailable() {
		return allErrs
	}
	kafkaExternal := r.ExternalListener()
	if kafkaExternal == nil || !kafkaExternal.External.Enabled {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry"),
				r.Spec.Configuration.SchemaRegistry,
				"cannot have a schema registry external listener without a kafka external listener"))
	}
	if kafkaExternal == nil && schemaRegistry.External.Subdomain != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("subdomain"),
				r.Spec.Configuration.SchemaRegistry.External.Subdomain,
				"the external kafka listener can't be empty if the registry subdomain is set"))
	}
	if kafkaExternal != nil && kafkaExternal.External.Subdomain != schemaRegistry.External.Subdomain {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("subdomain"),
				r.Spec.Configuration.SchemaRegistry.External.Subdomain,
				"sudomain of external schema registry must be the same as kafka's"))
	}
	if schemaRegistry.External.PreferredAddressType != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("preferredAddressType"),
				r.Spec.Configuration.SchemaRegistry.External.PreferredAddressType,
				"cannot provide a preferred address type for schema registry"))
	}
	if schemaRegistry.External.Bootstrap != nil {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry"),
				r.Spec.Configuration.SchemaRegistry.External,
				"bootstrap loadbalancer not available for schema registry"))
	}
	if schemaRegistry.External.EndpointTemplate != "" {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("endpointTemplate"),
				r.Spec.Configuration.SchemaRegistry.External.EndpointTemplate,
				"cannot provide an endpoint template for schema registry"))
	}
	if schemaRegistry.External.StaticNodePort && (schemaRegistry.Port < 30000 || schemaRegistry.Port > 32768) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("port"),
				r.Spec.Configuration.SchemaRegistry,
				"port must be in the range [30000-32768] when using a static node port"))
	}
	if schemaRegistry.External.Endpoint != "" && !validHostnameSegment.MatchString(schemaRegistry.External.Endpoint) {
		allErrs = append(allErrs,
			field.Invalid(field.NewPath("spec").Child("configuration").Child("schemaRegistry").Child("external").Child("endpoint"),
				r.Spec.Configuration.SchemaRegistry.External.Endpoint,
				fmt.Sprintf("endpoint for schema registry does not match regexp %s", validHostnameSegment.String())))
	}

	return allErrs
}

func (r *Cluster) validateResources(rf resourceField) field.ErrorList {
	if rf.resources == nil {
		return nil
	}
	var allErrs field.ErrorList

	// Memory limit (if set) cannot be lower than the requested
	if !rf.resources.Limits.Memory().IsZero() && rf.resources.Limits.Memory().Cmp(*rf.resources.Requests.Memory()) == -1 {
		allErrs = append(allErrs,
			field.Invalid(
				rf.path.Child("requests").Child("memory"),
				rf.resources.Requests.Memory(),
				"limits.memory < requests.memory; either increase the limit or remove it"))
	}

	// CPU limit (if set) cannot be lower than the requested
	if !rf.resources.Requests.Cpu().IsZero() && !rf.resources.Limits.Cpu().IsZero() &&
		rf.resources.Limits.Cpu().Cmp(*rf.resources.Requests.Cpu()) == -1 {
		allErrs = append(allErrs,
			field.Invalid(
				rf.path.Child("requests").Child("cpu"),
				rf.resources.Requests.Cpu(),
				"limits.cpu < requests.cpu; either increase the limit or remove it"))
	}

	return allErrs
}

func (r *Cluster) validateRedpandaResources(
	rf redpandaResourceField,
) field.ErrorList {
	allErrs := r.validateResources(resourceField{&rf.resources.ResourceRequirements, rf.path})

	// Memory redpanda (if set) must be less than or equal to request (if set)
	if !rf.resources.Requests.Memory().IsZero() && !rf.resources.RedpandaMemory().IsZero() && rf.resources.Requests.Memory().Cmp(*rf.resources.RedpandaMemory()) == -1 {
		allErrs = append(allErrs,
			field.Invalid(
				rf.path.Child("redpanda").Child("memory"),
				rf.resources.Requests.Memory(),
				"requests.memory < redpanda.memory; decrease or remove redpanda.memory"))
	}

	return allErrs
}

func (r *Cluster) validateLicense(old *Cluster) field.ErrorList {
	var allErrs field.ErrorList
	// Cluster has finalizers now, no validation if it is deleting
	if r.GetDeletionTimestamp() != nil {
		return allErrs
	}
	if l := r.Spec.LicenseRef; l != nil {
		secret, err := l.GetSecret(context.Background(), kclient)
		if err != nil {
			allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("licenseRef"), r.Spec.LicenseRef, err.Error()))
		}
		if secret != nil {
			if _, err := l.GetValue(secret, DefaultLicenseSecretKey); err != nil {
				allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("licenseRef"), r.Spec.LicenseRef, err.Error()))
			}
		}
	}
	if old.Spec.LicenseRef != nil && r.Spec.LicenseRef == nil {
		allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("licenseRef"), r.Spec.LicenseRef, "licenseRef must be set once configured"))
	}
	return allErrs
}

// validateRedpandaMemory verifies that memory limits are aligned with the minimal requirement of redpanda
// which is defined in `MinimumMemoryPerCore` constant
func (r *Cluster) validateRedpandaMemory() field.ErrorList {
	if r.Spec.Configuration.DeveloperMode {
		// for developer mode we don't enforce any memory limits
		return nil
	}
	var allErrs field.ErrorList

	// Ensure a requested 2GB of memory per core
	requests := r.Spec.Resources.Requests.DeepCopy()
	requests.Cpu().RoundUp(0)
	requestedCores := requests.Cpu().Value()
	if r.Spec.Resources.Requests.Memory().Value() < requestedCores*MinimumMemoryPerCore {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("requests").Child("memory"),
				r.Spec.Resources.Requests.Memory(),
				"requests.memory < 2Gi per core; either decrease requests.cpu or increase requests.memory"))
	}

	redpandaCores := r.Spec.Resources.RedpandaCPU().Value()
	minimumMemoryPerCore := int64(math.Floor(MinimumMemoryPerCore * RedpandaMemoryAllocationRatio))
	if !r.Spec.Resources.RedpandaMemory().IsZero() && r.Spec.Resources.RedpandaMemory().Value() < redpandaCores*minimumMemoryPerCore {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("resources").Child("redpanda").Child("memory"),
				r.Spec.Resources.Requests.Memory(),
				"redpanda.memory < 2Gi per core; either decrease redpanda.cpu or increase redpanda.memory"))
	}

	return allErrs
}

// validateRedpandaCoreChanges verifies that the number of cores assigned to each Redpanda node
// are not reduced during cluster updates
func (r *Cluster) validateRedpandaCoreChanges(old *Cluster) field.ErrorList {
	if r.Spec.Configuration.DeveloperMode {
		// for developer mode we don't enforce this rule
		return nil
	}
	var allErrs field.ErrorList

	oldCPURequest := old.Spec.Resources.RedpandaCPU()
	newCPURequest := r.Spec.Resources.RedpandaCPU()
	if oldCPURequest != nil && newCPURequest != nil {
		oldCores := oldCPURequest.Value()
		newCores := newCPURequest.Value()

		if newCores < oldCores {
			minAllowedCPU := (oldCores-1)*1000 + 1
			allErrs = append(allErrs,
				field.Invalid(
					field.NewPath("spec").Child("resources").Child("requests").Child("cpu"),
					r.Spec.Resources.Requests.Cpu(),
					fmt.Sprintf("CPU request must not be decreased; increase requests.cpu or redpanda.cpu to at least %dm", minAllowedCPU)))
		}
	}

	return allErrs
}

func validateAdminTLS(tlsConfig AdminAPITLS, path *field.Path) field.ErrorList {
	var allErrs field.ErrorList
	if tlsConfig.RequireClientAuth && !tlsConfig.Enabled {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireclientauth"),
				tlsConfig.RequireClientAuth,
				"Enabled has to be set to true for RequireClientAuth to be allowed to be true"))
	}
	return allErrs
}

func validateTLSRules(
	tlsListeners []ListenerWithName, path *field.Path,
) field.ErrorList {
	var allErrs field.ErrorList
	if len(tlsListeners) < 2 {
		return allErrs
	}
	if hasDifferentIssuers(tlsListeners[0].TLS, tlsListeners[1].TLS) {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("TLS").Child("IssuerRef"),
				tlsListeners[0].TLS.IssuerRef,
				"If two listeners have TLS enabled and issuerRef specified, this issuer must be the same for both"))
	}
	if hasDifferentNodeSecret(tlsListeners[0].TLS, tlsListeners[1].TLS) {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("TLS").Child("NodeSecretRef"),
				tlsListeners[0].TLS.IssuerRef,
				"If two listeners have TLS enabled and NodeSecretRef specified, it must be the same for both"))
	}
	return allErrs
}

func hasDifferentIssuers(listener1, listener2 KafkaAPITLS) bool {
	if listener1.IssuerRef == nil && listener2.IssuerRef == nil {
		// no issuer provided
		return false
	}
	if listener1.IssuerRef == nil || listener2.IssuerRef == nil {
		// one issuer set and another not set
		return true
	}
	return listener1.IssuerRef.Group != listener2.IssuerRef.Group ||
		listener1.IssuerRef.Kind != listener2.IssuerRef.Kind ||
		listener1.IssuerRef.Name != listener2.IssuerRef.Name
}

func hasDifferentNodeSecret(listener1, listener2 KafkaAPITLS) bool {
	if listener1.NodeSecretRef == nil && listener2.NodeSecretRef == nil {
		// no issuer provided
		return false
	}
	if listener1.NodeSecretRef == nil || listener2.NodeSecretRef == nil {
		// one issuer set and another not set
		return true
	}
	return listener1.NodeSecretRef.Namespace != listener2.NodeSecretRef.Namespace ||
		listener1.NodeSecretRef.Kind != listener2.NodeSecretRef.Kind ||
		listener1.NodeSecretRef.Name != listener2.NodeSecretRef.Name
}

func validateListener(
	tlsEnabled, requireClientAuth bool,
	issuerRef *cmmeta.ObjectReference,
	nodeSecretRef *corev1.ObjectReference,
	clientCACertRef *corev1.TypedLocalObjectReference,
	path *field.Path,
	external *ExternalConnectivityConfig,
	externalPath *field.Path,
	clusterNamespace string,
) field.ErrorList {
	var allErrs field.ErrorList
	if requireClientAuth && !tlsEnabled {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireClientAuth"),
				requireClientAuth,
				"Enabled has to be set to true for RequireClientAuth to be allowed to be true"))
	}
	if issuerRef != nil && nodeSecretRef != nil {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("nodeSecretRef"),
				nodeSecretRef,
				"Cannot provide both IssuerRef and NodeSecretRef"))
	}
	if tlsEnabled && external != nil && external.Enabled && external.Subdomain == "" {
		allErrs = append(allErrs,
			field.Invalid(
				externalPath.Child("subdomain"),
				external.Subdomain,
				"TLS requires specifying a subdomain"))
	}
	if !tlsEnabled || clientCACertRef == nil {
		return allErrs
	}

	return validateExternalCA(requireClientAuth, clientCACertRef, path, clusterNamespace)
}

func validateExternalCA(
	requireClientAuth bool,
	clientCACertRef *corev1.TypedLocalObjectReference,
	path *field.Path,
	clusterNamespace string,
) field.ErrorList {
	var allErrs field.ErrorList

	if !requireClientAuth {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("requireClientAuth"),
				requireClientAuth,
				"Enabled has to be set to true for RequireClientAuth if ClientCACertRef is set"))
	}

	if clientCACertRef.Name == "" {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("clientCACertRef").Child("name"),
				clientCACertRef,
				"Name must be provided if ClientCACertRef is set"))
	}

	if clientCACertRef.Kind != "" && !strings.EqualFold(clientCACertRef.Kind, "Secret") {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("clientCACertRef").Child("kind"),
				clientCACertRef,
				"Kind must be set to secret if set in ClientCACertRef"))
	}

	if len(allErrs) > 0 {
		return allErrs
	}

	secret := &corev1.Secret{}
	err := kclient.Get(context.TODO(), types.NamespacedName{Name: clientCACertRef.Name, Namespace: clusterNamespace}, secret)
	if err != nil {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("clientCACertRef"),
				clientCACertRef,
				"Failed to get secret: "+err.Error()))
		return allErrs
	}

	crt, found := secret.Data[cmmeta.TLSCAKey]
	if !found {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("clientCACertRef"),
				clientCACertRef,
				"ca.crt must be set in the client CA secret"))
		return allErrs
	}

	pb, _ := pem.Decode(crt)
	if pb == nil {
		allErrs = append(allErrs,
			field.Invalid(
				path.Child("clientCACertRef"),
				clientCACertRef,
				"Invalid certificate in the client CA secret"))
	} else {
		_, err := x509.ParseCertificate(pb.Bytes)
		if err != nil {
			allErrs = append(allErrs,
				field.Invalid(
					path.Child("clientCACertRef"),
					clientCACertRef,
					"Invalid certificate in the client CA secret: "+err.Error()))
		}
	}

	return allErrs
}

func (r *Cluster) validateArchivalStorage() field.ErrorList {
	var allErrs field.ErrorList
	if !r.Spec.CloudStorage.Enabled {
		return allErrs
	}
	if r.Spec.CloudStorage.CredentialsSource.IsDefault() && r.Spec.CloudStorage.AccessKey == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("accessKey"),
				r.Spec.CloudStorage.AccessKey,
				"AccessKey has to be provided for cloud storage to be enabled using default credentials source"))
	}
	if r.Spec.CloudStorage.Bucket == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("bucket"),
				r.Spec.CloudStorage.Bucket,
				"Bucket has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.Region == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("region"),
				r.Spec.CloudStorage.Region,
				"Region has to be provided for cloud storage to be enabled"))
	}
	if r.Spec.CloudStorage.CredentialsSource.IsDefault() && r.Spec.CloudStorage.SecretKeyRef.Name == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("secretKeyRef").Child("name"),
				r.Spec.CloudStorage.SecretKeyRef.Name,
				"SecretKeyRef name has to be provided for cloud storage to be enabled using default credentials source"))
	}
	if r.Spec.CloudStorage.SecretKeyRef.Name != "" && r.Spec.CloudStorage.SecretKeyRef.Namespace == "" {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("configuration").Child("cloudStorage").Child("secretKeyRef").Child("namespace"),
				r.Spec.CloudStorage.SecretKeyRef.Namespace,
				"SecretKeyRef namespace has to be defined when name is provided"))
	}
	return allErrs
}

func (r *Cluster) validatePodDisruptionBudget() field.ErrorList {
	var allErrs field.ErrorList
	if r.Spec.PodDisruptionBudget == nil {
		return allErrs
	}
	if (r.Spec.PodDisruptionBudget.MaxUnavailable != nil || r.Spec.PodDisruptionBudget.MinAvailable != nil) &&
		!r.Spec.PodDisruptionBudget.Enabled {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("podDisruptionBudget"),
				r.Spec.PodDisruptionBudget,
				"MaxUnavailable or MinAvailable is set but the podDisruptionBudget is not enabled"))
	}
	if r.Spec.PodDisruptionBudget.Enabled && r.Spec.PodDisruptionBudget.MaxUnavailable != nil && r.Spec.PodDisruptionBudget.MinAvailable != nil {
		allErrs = append(allErrs,
			field.Invalid(
				field.NewPath("spec").Child("podDisruptionBudget"),
				r.Spec.PodDisruptionBudget,
				"Cannot specify both MaxUnavailable and MinAvailable in PodDisruptionBudget"))
	}
	return allErrs
}

func (r *Cluster) validateAdditionalConfiguration() field.ErrorList {
	var allErrs field.ErrorList
	var idAllocatorReplication, transactionCoordinatorReplication, defaultTopicReplication int
	internalTopicReplicationFactor := defaultInternalTopicReplicationNumber
	minReplication := defaultInternalTopicReplicationNumber
	if r.Spec.Replicas != nil && int(*r.Spec.Replicas) < defaultInternalTopicReplicationNumber {
		minReplication = int(*r.Spec.Replicas)
	}
	for k, v := range r.Spec.AdditionalConfiguration {
		var err error
		switch k {
		// Would be good to make these checks issue warnings for their fields being
		// deprecated once controller-runtime supports adding warnings to the result.
		// see https://github.com/kubernetes-sigs/controller-runtime/pull/2014
		case idAllocatorReplicationKey:
			idAllocatorReplication, err = strconv.Atoi(v)
			if err != nil {
				allErrs = append(allErrs, field.Invalid(
					field.NewPath("spec").Child("additionalConfiguration").Child(k), v, "Must be an integer."))
				break
			}
			if idAllocatorReplication > minReplication {
				minReplication = idAllocatorReplication
			}
		case transactionCoordinatorReplicationKey:
			transactionCoordinatorReplication, err = strconv.Atoi(v)
			if err != nil {
				allErrs = append(allErrs, field.Invalid(
					field.NewPath("spec").Child("additionalConfiguration").Child(k), v, "Must be an integer."))
				break
			}
			if transactionCoordinatorReplication > minReplication {
				minReplication = transactionCoordinatorReplication
			}
		case defaultTopicReplicationKey:
			defaultTopicReplication, err = strconv.Atoi(v)
			if err != nil {
				allErrs = append(allErrs, field.Invalid(
					field.NewPath("spec").Child("additionalConfiguration").Child(k), v, "Must be an integer."))
				break
			}
			if defaultTopicReplication > minReplication {
				minReplication = defaultTopicReplication
			}
		case internalTopicReplicationFactorKey:
			internalTopicReplicationFactor, err = strconv.Atoi(v)
			if err != nil {
				allErrs = append(allErrs, field.Invalid(
					field.NewPath("spec").Child("additionalConfiguration").Child(k), v, "Must be an integer."))
			}
		}
	}
	if internalTopicReplicationFactor < minReplication {
		allErrs = append(allErrs, field.Invalid(
			field.NewPath("spec").Child("additionalConfiguration").Child(internalTopicReplicationFactorKey),
			internalTopicReplicationFactor,
			fmt.Sprintf("Cannot be reduced from %d", minReplication)))
	}
	return allErrs
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (r *Cluster) ValidateDelete() error {
	// this is a stub to implement the interface. We do not validate on delete.
	return nil
}

type listenersPorts struct {
	name                 string
	port                 int
	externalConnectivity bool
	externalPort         *int
}

// TODO move this to networking package
func (r *Cluster) checkCollidingPorts() field.ErrorList {
	var allErrs field.ErrorList
	ports := r.getAllPorts()

	for i := range ports {
		for j := len(ports) - 1; j > i; j-- {
			if ports[i].port == ports[j].port {
				allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("configuration", ports[i].name, "port"),
					ports[i].port,
					fmt.Sprintf("%s port collide with Spec.Configuration.%s Port", ports[i].name, ports[j].name)))
			}
			externalPortJ := ports[j].port + 1
			if ports[j].externalPort != nil {
				externalPortJ = *ports[j].externalPort
			}
			externalPortI := ports[i].port + 1
			if ports[i].externalPort != nil {
				externalPortI = *ports[i].externalPort
			}
			if ports[j].externalConnectivity && ports[i].port == externalPortJ {
				allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("configuration", ports[i].name, "port"),
					ports[i].port,
					fmt.Sprintf("%s port collide with external %s port", ports[i].name, ports[j].name)))
			}

			if ports[i].externalConnectivity && externalPortI == ports[j].port {
				allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("configuration", ports[i].name, "port"),
					ports[i].port,
					fmt.Sprintf("external %s port collide with Spec.Configuration.%s port", ports[i].name, ports[j].name)))
			}

			if ports[i].externalConnectivity && ports[j].externalConnectivity && externalPortI == externalPortJ {
				allErrs = append(allErrs, field.Invalid(field.NewPath("spec").Child("configuration", ports[i].name, "port"),
					ports[i].port,
					fmt.Sprintf("external %s port collide with external %s Port that is not defined in CR", ports[i].name, ports[j].name)))
			}
		}
	}

	return allErrs
}

func (r *Cluster) getAllPorts() []listenersPorts {
	ports := []listenersPorts{
		{
			name:                 "RPCApi",
			port:                 r.Spec.Configuration.RPCServer.Port,
			externalConnectivity: false,
		},
	}

	if internal := r.InternalListener(); internal != nil {
		externalListener := r.ExternalListener()
		var externalPort *int
		if externalListener != nil && externalListener.Port != 0 {
			externalPort = &externalListener.Port
		}
		ports = append(ports, listenersPorts{
			name:                 "kafkaApi",
			port:                 internal.Port,
			externalConnectivity: externalListener != nil,
			externalPort:         externalPort,
		})
	}

	if internal := r.AdminAPIInternal(); internal != nil {
		externalListener := r.AdminAPIExternal()
		var externalPort *int
		if externalListener != nil && externalListener.Port != 0 {
			externalPort = &externalListener.Port
		}
		ports = append(ports, listenersPorts{
			name:                 "adminApi",
			port:                 internal.Port,
			externalConnectivity: r.AdminAPIExternal() != nil,
			externalPort:         externalPort,
		})
	}

	if internal := r.PandaproxyAPIInternal(); internal != nil {
		externalListener := r.PandaproxyAPIExternal()
		var externalPort *int
		if externalListener != nil && externalListener.Port != 0 {
			externalPort = &externalListener.Port
		}
		ports = append(ports, listenersPorts{
			name:                 "pandaproxyApi",
			port:                 internal.Port,
			externalConnectivity: r.PandaproxyAPIExternal() != nil,
			externalPort:         externalPort,
		})
	}

	if r.Spec.Configuration.SchemaRegistry != nil {
		var externalConnectivity bool
		var externalPort *int
		if ext := r.Spec.Configuration.SchemaRegistry.External; ext != nil && ext.Enabled && ext.StaticNodePort {
			externalConnectivity = true
			externalPort = &r.Spec.Configuration.SchemaRegistry.Port
		}
		ports = append(ports, listenersPorts{
			name:                 "schemaRegistryApi",
			port:                 r.Spec.Configuration.SchemaRegistry.Port,
			externalConnectivity: externalConnectivity,
			externalPort:         externalPort,
		})
	}
	return ports
}
